package com.ctw.tinyservices.id.core.segment.generator;

import com.ctw.tinyservices.id.common.entity.IdSegment;
import com.ctw.tinyservices.id.common.enums.state.IdLifecycleState;
import com.ctw.tinyservices.id.common.exception.IdStateException;
import com.ctw.tinyservices.id.common.utils.LogUtils;
import com.ctw.tinyservices.id.core.segment.service.IdSegmentService;
import com.ctw.tinyservices.id.core.AbstractIdGenerator;
import com.ctw.tinyservices.id.core.segment.buffer.Segment;
import com.ctw.tinyservices.id.core.segment.buffer.SegmentBuffer;

import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;

/**
 * @author TongWei.Chen 2022/3/4 13:40
 *
 * 号段模式id生成器
 **/
public class IdSegmentGenerator extends AbstractIdGenerator {
    /**
     * 双buffer缓存
     */
    private Map<String /* bizType */, SegmentBuffer> segmentBufferMap = new ConcurrentHashMap<>();

    /**
     * 最大步长不超过100,0000
     */
    private static final int MAX_STEP = 100_0000;
    /**
     * 一个Segment维持时间为30分钟
     */
    private static final long SEGMENT_DURATION = 30 * 60 * 1000L;

    private ScheduledExecutorService scheduledExecutorService = null;

    private ExecutorService threadPool = new ThreadPoolExecutor(5, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new UpdateThreadFactory());

    public static class UpdateThreadFactory implements ThreadFactory {
        private static int threadInitNumber = 0;
        private static synchronized int nextThreadNum() {
            return threadInitNumber ++;
        }
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "Thread-Segment-Update-" + nextThreadNum());
        }
    }

    private IdSegmentService idSegmentService;

    public IdSegmentGenerator(IdSegmentService idSegmentService) {
        this.idSegmentService = idSegmentService;
    }

    @Override
    public Long getId(String bizType) {
        if (state != IdLifecycleState.START) {
            LogUtils.error(IdSegmentGenerator.class, "Init buffer error， please see doStart method log, bizType is [{}]", bizType);
            throw new IdStateException("Init segment error， please see doStart method log");
        }
        if (segmentBufferMap.containsKey(bizType)) {
            return getIdFromSegmentBuffer(segmentBufferMap.get(bizType));
        }
        LogUtils.warn(IdSegmentGenerator.class, "[{}] not in buffer or db", bizType);
        return null;
    }

    @Override
    protected void doInit() {
        LogUtils.info(IdSegmentGenerator.class, "SegmentGenerator doInit data begin...");
        updateCacheFromDb();
        LogUtils.info(IdSegmentGenerator.class, "SegmentGenerator doInit data end...");
    }

    @Override
    protected void doStart() {
        LogUtils.info(IdSegmentGenerator.class, "SegmentGenerator doStart data begin...");
        // 每分钟刷新cache一次
        updateCacheFromDbAtEveryMinute();
        LogUtils.info(IdSegmentGenerator.class, "SegmentGenerator doStart data end...");
    }

    @Override
    protected void doStop() {
        LogUtils.info(IdSegmentGenerator.class, "SegmentGenerator doStop data begin...");
        threadPool.shutdown(); // 使新任务无法提交.
        try {
            // 等待未完成任务结束
            if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
                threadPool.shutdownNow(); // 取消当前执行的任务
                LogUtils.warn(IdSegmentGenerator.class, "Interrupt the worker, which may cause some task inconsistent. Please check the biz logs.");
                // 等待任务取消的响应
                if (!threadPool.awaitTermination(60, TimeUnit.SECONDS))
                    LogUtils.error(IdSegmentGenerator.class, "Thread pool can't be shutdown even with interrupting worker threads, which may cause some task inconsistent. Please check the biz logs.");
            }
        } catch (InterruptedException ie) {
            // 重新取消当前线程进行中断
            threadPool.shutdownNow();
            LogUtils.error(IdSegmentGenerator.class, "The current server thread is interrupted when it is trying to stop the worker threads. This may leave an inconcistent state. Please check the biz logs.");
            // 保留中断状态
            Thread.currentThread().interrupt();
        }
        scheduledExecutorService.shutdownNow();
        LogUtils.info(IdSegmentGenerator.class, "SegmentGenerator doStop data end...");
    }

    @Override
    public String name() {
        return "segment";
    }

    private void updateCacheFromDb() {
        LogUtils.info(IdSegmentGenerator.class, "Segment updateCacheFromDb begin...");
        try {
            List<IdSegment> idSegmentList = idSegmentService.list();
            if (null == idSegmentList || idSegmentList.isEmpty()) {
                LogUtils.warn(IdSegmentGenerator.class, "Segment all biz_type is empty");
                return;
            }
            // 将dbTags中新加的tag添加cache，通过遍历dbTags，判断是否在cache中存在，不存在就添加到cache
            for (IdSegment idSegment : idSegmentList) {
                if (! segmentBufferMap.containsKey(idSegment.getBizType())) {
                    SegmentBuffer buffer = new SegmentBuffer();
                    buffer.setBizType(idSegment.getBizType());
                    fillSegment(buffer.getCurrent(), fillSegmentBuffer(idSegment.getBizType(), buffer.getCurrent()).getMaxId());

                    segmentBufferMap.put(idSegment.getBizType(), buffer);
                    LogUtils.info(IdSegmentGenerator.class, "Segment updateCacheFromDb Add biz_type [{}] from db to segmentBufferMap, SegmentBuffer [{}]", idSegment.getBizType(), buffer);
                }
            }
            // 将segmentBufferMap中已失效biz_type删除，言外之意就是数据库里删了，但是缓存里还没删
            List<String> cacheBizTypeList = new ArrayList<>(segmentBufferMap.keySet());
            Set<String> idSegmentSet = idSegmentList.stream().map(IdSegment::getBizType).collect(Collectors.toSet());
            for (String cacheBizType : cacheBizTypeList) {
                if (! idSegmentSet.contains(cacheBizType)) {
                    segmentBufferMap.remove(cacheBizType);
                    LogUtils.info(IdSegmentGenerator.class, "Segment updateCacheFromDb Remove biz_type [{}] from segmentBufferMap", cacheBizType);
                }
            }
            LogUtils.info(IdSegmentGenerator.class, "Segment updateCacheFromDb success... segmentBufferMap is [{}]", segmentBufferMap);
        } catch (Exception e) {
            LogUtils.error(IdSegmentGenerator.class, "Segment updateCacheFromDb exception", e);
        }
    }

    private void updateCacheFromDbAtEveryMinute() {
        scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> {
            Thread t = new Thread(r);
            t.setName("check-segmentBufferMap-thread");
            t.setDaemon(true);
            return t;
        });
        scheduledExecutorService.scheduleWithFixedDelay(this::updateCacheFromDb, 60, 60, TimeUnit.SECONDS);
    }

    /**
     * 获取id
     * @param buffer 双buffer
     * @return id
     */
    public Long getIdFromSegmentBuffer(SegmentBuffer buffer) {
        while (getState() == IdLifecycleState.START) {
            buffer.rLock().lock();
            try {
                final Segment segment = buffer.getCurrent();
                if (! buffer.isNextReady() && (segment.getIdle() < 0.9 * segment.getStep()) && buffer.getThreadRunning().compareAndSet(false, true)) {
                    threadPool.execute(() -> {
                        Segment nextSegment = buffer.getSegments()[buffer.nextPos()];
                        boolean updateOk = false;
                        try {
                            updateSegmentFromDb(buffer.getBizType(), nextSegment);
                            updateOk = true;
                            LogUtils.info(IdSegmentGenerator.class, "getIdFromSegmentBuffer update segment [{}] from db [{}]", buffer.getBizType(), nextSegment);
                        } catch (Exception e) {
                            // 第二个号段写失败，不慌 ，不用返回错误，起码号段1还能用。
                            LogUtils.warn(IdSegmentGenerator.class, "updateSegmentFromDb exception, bizType is [{}]", buffer.getBizType(), e);
                        } finally {
                            if (updateOk) {
                                buffer.wLock().lock();
                                buffer.setNextReady(true);
                                buffer.getThreadRunning().set(false);
                                buffer.wLock().unlock();
                            } else {
                                buffer.getThreadRunning().set(false);
                            }
                        }
                    });
                }
                long value = segment.getValue().getAndIncrement();
                if (value < segment.getMax()) {
                    return value;
                }
            } finally {
                buffer.rLock().unlock();
            }
            waitAndSleep(buffer);
            buffer.wLock().lock();
            try {
                final Segment segment = buffer.getCurrent();
                long value = segment.getValue().getAndIncrement();
                if (value < segment.getMax()) {
                    return value;
                }
                if (buffer.isNextReady()) {
                    buffer.switchPos();
                    buffer.setNextReady(false);
                } else {
                    LogUtils.error(IdSegmentGenerator.class, "getIdFromSegmentBuffer Both two segments in [{}] are not ready!", buffer);
                    return null;
                }
            } finally {
                buffer.wLock().unlock();
            }
        }
        LogUtils.warn(IdSegmentGenerator.class, "server is closed, and state is [{}]", getState());
        return null;
    }

    /**
     * 动态步长
     *
     * @param bizType 业务类型
     * @param segment 号段
     */
    private void updateSegmentFromDb(String bizType, Segment segment) {
        IdSegment idSegment = fillSegmentBuffer(bizType, segment);
        if (null == idSegment) {
            // 动态步长
            long duration = System.currentTimeMillis() - segment.getUpdateTimestamp();
            int nextStep = segment.getStep();
            if (duration < SEGMENT_DURATION) {
                nextStep = nextStep * 2 > MAX_STEP ? nextStep : nextStep * 2;
            }
            if (duration > SEGMENT_DURATION * 2) {
                nextStep = nextStep / 2 >= segment.getMinStep() ? nextStep / 2 : nextStep;
            }
            LogUtils.info(IdSegmentGenerator.class, "leafKey[{}], step[{}], duration[{}mins], nextStep[{}]", bizType, segment.getStep(), String.format("%.2f",((double)duration / (1000 * 60))), nextStep);
            idSegment = idSegmentService.updateMaxIdByStepAndGet(bizType, nextStep);
            doFillSegmentBuffer(segment, nextStep, idSegment.getStep());
        }
        fillSegment(segment, idSegment.getMaxId());
    }

    private void waitAndSleep(SegmentBuffer buffer) {
        int roll = 0;
        while (buffer.getThreadRunning().get()) {
            roll += 1;
            if(roll > 10000) {
                try {
                    TimeUnit.MILLISECONDS.sleep(10);
                    break;
                } catch (InterruptedException e) {
                    LogUtils.warn(IdSegmentGenerator.class, "Thread {} Interrupted",Thread.currentThread().getName());
                    break;
                }
            }
        }
    }

    public IdSegment fillSegmentBuffer(String bizType, Segment segment) {
        if (! segment.isInit()) {
            IdSegment idSegment = idSegmentService.updateMaxIdAndGet(bizType);
            doFillSegmentBuffer(segment, idSegment.getStep(), idSegment.getStep());
            segment.setInit(true);
            return idSegment;
        }
        return null;
    }

    public void fillSegment(Segment segment, long maxId) {
        // must set value before set max
        segment.getValue().set(maxId - segment.getStep());
        segment.setMax(maxId);
    }

    private void doFillSegmentBuffer(Segment segment, int step, int minStep) {
        segment.setStep(step);
        segment.setMinStep(minStep);
        segment.setUpdateTimestamp(System.currentTimeMillis());
    }
}
